package org.topteam.oschat.akka;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.websocket.Session;

import org.topteam.oschat.entity.User;
import org.topteam.oschat.event.SessionEvent;
import org.topteam.oschat.event.UserEvent;
import org.topteam.oschat.event.SessionEvent.SessionClose;
import org.topteam.oschat.event.SessionEvent.SessionOpen;
import org.topteam.oschat.event.UserEvent.Login;
import org.topteam.oschat.message.Data;
import org.topteam.oschat.message.TextMessage;
import org.topteam.oschat.service.IUserService;
import org.topteam.oschat.service.UserStatue;
import org.topteam.oschat.util.SpringUtils;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.cluster.routing.AdaptiveLoadBalancingGroup;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.cluster.routing.HeapMetricsSelector;
import akka.contrib.pattern.ClusterSingletonProxy;

/**
 * UserActor负责用户的所有消息，事件的处理。 每一个在线用户会对应一个UserActor。
 * 
 * @author JiangFeng
 * 
 */
public class UserActor extends UntypedActor {

	/**
	 * 当前用户开启的所有会话连接
	 */
	private Map<String, Session> sessions = new HashMap<String, Session>();

	/**
	 * 当前用户对象
	 */
	private User user;

	private IUserService userService;

	public UserActor(User user) {
		this.user = user;
		userService = SpringUtils.getBean(IUserService.class);
	}

	/**
	 * 所有事件、消息处理
	 * <ul>
	 * <li>打开WebSocket连接
	 * {@link org.topteam.oschat.event.SessionEvent.SessionOpen}</li>
	 * <li>关闭WebSocket连接
	 * {@link org.topteam.oschat.event.SessionEvent.SessionClose}</li>
	 * <li>失去WebSocket主连接
	 * {@link org.topteam.oschat.event.SessionEvent.LostConnection}</li>
	 * <li>用户登录事件{@link org.topteam.oschat.event.UserEvent.Login}</li>
	 * <li>用户登出事件{@link org.topteam.oschat.event.UserEvent.Logout}</li>
	 * </ul>
	 */
	@Override
	public void onReceive(Object message) throws Exception {
		if (message instanceof SessionEvent.SessionOpen) {
			SessionOpen open = (SessionOpen) message;
			sessions.put(open.getWithId(), open.getSession());
			if (user.getUserId().equals(open.getWithId())) {
				Login loginEvent = new Login(user);
				selfLoginUpdate(loginEvent);
			}
		} else if (message instanceof SessionEvent.SessionClose) {
			SessionClose close = (SessionClose) message;
			sessions.remove(close.getWithId());
			if (close.getUserId().equals(close.getWithId())) {
				getSelf().tell(new SessionEvent.LostConnection(), getSelf());
				getSender().tell(new UserEvent.Logout(user), getSelf());
			}
		} else if (message instanceof SessionEvent.LostConnection) {
			for (Session session : sessions.values()) {
				session.close();
			}
			sessions.clear();
			getContext().stop(getSelf());
		} else if (message instanceof Login) {
			Login login = (Login) message;
			sessions.get(user.getUserId()).getAsyncRemote()
					.sendObject(Data.buildEventMessage(login));
		} else if (message instanceof TextMessage) {
			TextMessage msg = (TextMessage) message;
			Session session = sessions.get(msg.getFrom());
			if (session == null) {
				session = getSelfSession();
			}
			session.getAsyncRemote().sendObject(msg);
		}
	}

	private void selfLoginUpdate(Login loginEvent) {
		getSelfSession().getAsyncRemote().sendObject(
				Data.buildEventMessage(loginEvent));
		userService.updateStat(user.getUserId(), UserStatue.ON_LINE);
		tellOnlineFirends(loginEvent);
	}

	private void tellOnlineFirends(Object o) {
		List<String> onlineFirends = userService.getOnlineFirends(user
				.getUserId());
		for (String u : onlineFirends) {
			ActorRef uActor = getRemoteActor(u);
			uActor.tell(o, getSelf());
		}
	}

	private Session getSelfSession() {
		return sessions.get(user.getUserId());
	}

	private ActorRef getRemoteActor(String id) {
		// int totalInstances = 100;
		// Iterable<String> routeesPaths = Arrays.asList("/user/" + id);
		// boolean allowLocalRoutees = true;
		// ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
		// new AdaptiveLoadBalancingGroup(
		// HeapMetricsSelector.getInstance(),
		// Collections.<String> emptyList()),
		// new ClusterRouterGroupSettings(totalInstances, routeesPaths,
		// allowLocalRoutees, "oschat"));
		// ActorRef remoteActor = getContext().system().actorOf(
		// clusterRouterGroup.props());
		//
		ActorRef remoteActor = getContext().actorOf(
				ClusterSingletonProxy.defaultProps("user/" + id + "/" + id,
						"oschat"));
		return remoteActor;
	}
}
